"""Common cki_tools helpers NOT for external consumption."""

import argparse
import collections
from functools import cached_property
import os

import boto3
import botocore

MAX_POOL_CONNECTIONS = 10


BucketSpec = collections.namedtuple('BucketSpec',
                                    ['endpoint', 'access_key', 'secret_key',
                                     'bucket', 'prefix'])


def parse_bucket_spec(bucket_spec) -> BucketSpec:
    """Parse a deployment-all-style bucket specification."""
    endpoint, access_key, secret_key, bucket, bucket_path = bucket_spec.split(
        '|')
    endpoint = endpoint.rstrip('/') or 'http://s3.amazonaws.com'
    bucket = bucket.rstrip('/')
    bucket_path = bucket_path.rstrip('/') + '/' if bucket_path else ''
    return BucketSpec(endpoint, access_key, secret_key, bucket, bucket_path)


class StoreBucketSpec(argparse.Action):
    # pylint: disable=too-few-public-methods
    """Parse bucket specs from env variables.

    The argument needs to be the name of an env variable that contains the
    bucket spec.
    """

    def __call__(self, parser, namespace, values, option_string=None):
        """Parse the contents of a bucket spec env variable."""
        setattr(namespace, self.dest,
                parse_bucket_spec(os.environ[values]))


class S3Bucket:
    """S3 bucket spec and client."""

    def __init__(self, bucket_spec: BucketSpec):
        """Create a new instance.

        Will use unsigned requests if the bucket spec does not contain
        credentials and no credentials are available via an IAM role.
        """
        self.spec = bucket_spec
        config = botocore.client.Config(
            max_pool_connections=MAX_POOL_CONNECTIONS)
        if not self.spec.access_key:
            if boto3.DEFAULT_SESSION is None:
                boto3.setup_default_session()
            credentials = boto3.DEFAULT_SESSION.get_credentials()
            if not credentials or credentials.method != 'iam-role':
                config.signature_version = botocore.UNSIGNED
        self.kwargs = {
            'aws_access_key_id': self.spec.access_key or None,
            'aws_secret_access_key': self.spec.secret_key or None,
            'endpoint_url': self.spec.endpoint or None,
            'config': config,
        }

    @classmethod
    def from_bucket_string(cls, bucket_spec: str) -> 'S3Bucket':
        """Initialize S3Bucket from bucket_spec as a string."""
        spec = parse_bucket_spec(bucket_spec)
        return cls(spec)

    @cached_property
    def bucket(self):
        """Get s3 bucket."""
        resource = boto3.resource('s3', **self.kwargs)
        return resource.Bucket(self.spec.bucket)

    @cached_property
    def client(self) -> botocore.client.BaseClient:
        """Get s3 bucket client."""
        return boto3.client('s3', **self.kwargs)
